MQTT系列最终章 |
您所在的位置:网站首页 › python paho mqtt重连回调 › MQTT系列最终章 |
写在前面 通过之前MQTT系列-Eclipse.Paho源码分析(二)-消息的发送与接收的介绍,相信仔细阅读过的小伙伴已经对Eclipse.Paho内部发送和订阅消息的流程有了一个较为清晰的认识,今天我们就把剩下的边角料扫一扫,也就是Eclipse.Paho作为客户端是如何进行容灾补偿和心跳的相关介绍。 心跳机制 首先了解一下在MQTT协议中心跳请求和响应是如何规定的。下面是官方文档中的描述:简单来说,就是在创建连接时发送的CONNECT控制报文中,在第九和第十字节会携带客户端与服务端的最大连接时长,如果超过这个时间,服务端和客户端会各自做一些相应的处理。但是这样会有一个问题,当客户端在超过最大连接时长的时间段内确实没有消息上送至服务器,此时服务器是无法判断因为客户端出现故障导致的还是确实没有收到消息导致的。所以MQTT协议中规定了PINGREQ和PINGRESP两种控制类型的报文用来处理上述情况,即如果客户端真的没有消息上送,你也要定时给我发送一个PINGREQ类型的报文告诉我你还活着,我服务器收到后会即使回送一个PINGRESP报文告诉客户端我收到了,这就是一条心跳消息。 下面我们来看看Eclipse.Paho的实现:初始化心跳消息发送器默认eclipse paho提供了两种MqttPingSender的实现:TimerPingSender:使用了Java的原生定时工具Timer ScheduledExecutorPingSender:基于线程池的定时任务调 之前说过我们在创建连接时首先会创建一个异步的客户端连接对象MqttAsyncClient,在创建MqttAsyncClient时,会默认将心跳消息发送器TimerPingSender创建出来,也就是说系统默认使用的就是TimerPingSender,具体代码如下public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException { // 构造方法中创建 TimerPingSender this(serverURI, clientId, persistence, new TimerPingSender()); }我们来分析一下这个类,这个类实现了MqttPingSender接口,接口中提供了5个方法 public interface MqttPingSender { /** * Initial method. Pass interal state of current client in. * @param comms The core of the client, which holds the state information for pending and in-flight messages. */ // 初始化心跳发送器 void init(ClientComms comms); /** * Start ping sender. It will be called after connection is success. */ // 心跳开始 void start(); /** * Stop ping sender. It is called if there is any errors or connection shutdowns. */ // 心跳终止 void stop(); /** * Schedule next ping in certain delay. * @param delayInMilliseconds delay in milliseconds. */ // 触发下一次心跳 void schedule(long delayInMilliseconds);}init() 心跳初始化 客户端与服务端建立连接后,服务端会响应一个CONNACK类型的报文,所以在消息接收线程中,如果判断是这种类型的报文,会创建Timer并开始心跳 protected void notifyReceivedAck(MqttAck ack) throws MqttException { if (token == null) { ... } else if (ack instanceof MqttPubRec) { ... } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) { ... } else if (ack instanceof MqttPingResp) { ... } else if (ack instanceof MqttConnack) { int rc = ((MqttConnack) ack).getReturnCode(); if (rc == 0) { synchronized (queueLock) { if (cleanSession) { clearState(); // Add the connect token back in so that users can be // notified when connect completes. tokenStore.saveToken(token,ack); } inFlightPubRels = 0; actualInFlight = 0; restoreInflightMessages(); // 开启PingSender connected(); } } ... }PingOutStanding:表示待确认的PINGREQ的报文数量,即我们上一章提到的心跳出站计数器,当心跳消息发送成功后这个变量的值会增加1,收到PINGRESP确认报文后这个变量的值会减1,但是不会小于0 lastInboundActivity:表示客户端最近一次收到服务端的报文的时间 lastOutboundActivity:表示客户端最近一次成功发送报文的时间 delta:考虑到System.currentTimeMillis()不精确,paho通过增加一个缓冲时间来减少ping包的发送频率。默认为100000微妙 下面说下处理心跳的具体流程: 如果已经连接到服务器,且心跳时间keepAlive的值大于0,则会尝试走ping报文发送流程。 先判断是否出现异常,异常有两种情况: 发送PINGREQ报文后(pingOutStanding > 0),在 keepAlive + delta 时间间隔内未从服务端收到报文,则会抛出超时异常,抛出异常后,客户端会断开和服务端的连接 如果在 2倍的keepAlive时间间隔内,客户端既没有成功发送过PINGREQ报文也没有成功发送过其他报文,客户端认为到broker的连接已经断开,则会抛出超时异常,抛出异常后,客户端会断开和服务端的连接 未抛出异常,正常发送,计算下次ping的时间间隔 如果在 keepAlive - delta 时间间隔内既没法送过也没收到过心跳消息 或者 最近一次消息(不一定是心跳消息)时间间隔超过 keepAlive - delta ,则向broker发送PINGREQ报文,否则执行2 条件1满足表示暂时不需要发送 PINGREQ 报文,经过 Math.max(1, getKeepAlive() - (time - lastOutboundActivity))时间后再检查。 总结MQTT协议没有对客户端的实现进行详细的规定,只是说明客户端在发送PINGREQ报文后,如果在合理的时间内仍没有收到PINGRESP报文,客户端应该断开和服务端的网络连接。根据这一规则,客户端可以根据自己的需求进行具体的实现,paho对”合理时间"的设置为 keepAlive + delta 和 2 * keepAlive,并根据"合理时间"的设置考虑了两种异常情况:发送了PINGREQ报文,但在 keepAlive + delta 内未收到服务端的任何数据包 在2 * keepAlive 内未成功发送过任何数据包 出现以上两种异常情况后,客户端会主动断开和服务端的连接,排除两种异常情况,将向服务端发送PINGREQ的最小时间间隔设置为keepAlive - delta。重连与消息重发机制 重连机制重连机制在许多中间件中都有使用,例如分布式协调组件ZooKeeper,数据库连接池Druid等,可以说只要涉及到C/S设计架构的,都逃避不开客户端断开连接时与服务端的重新连接问题。所以本小节我们就来看一下Eclipse.Paho当出现连接异常时,会调用ClientComms的shutdownConnection()方法断开连接,在shutdownConnection()方法中主要是做一些清理资源的动作,例如将发送和接收线程停止,断开socket等,之后触发MqttCallBack进行回调处理,这里用到的回调是MqttReconnectCallback,其重连操作核心方法是startReconnectCycle(),我们来看一下这个方法的具体实现private void startReconnectCycle() { reconnectTimer = new Timer("MQTT Reconnect: " + clientId); reconnectTimer.schedule(new ReconnectTask(), reconnectDelay); }在上述方法中会初始化一个Timer定时器,通过触发ReconnectTask执行重连逻辑,并且延时时间执行,默认时长为1s,也就是说,第一次重连尝试会在断开连接后1秒执行。在ReconnectTask中,主要做的就是进行一次连接操作,具体代码如下:在初始化连接对象阶段,会将消息从持久化存储恢复到出站队列 在建立连接后(收到CONNACK类型报文)根据消息类型将消息从出站队列放到发送队列或特殊消息队列中进行发送 下面看一下具体代码 在创建连接对象时会初始化ClientState对象,在ClientState的构造方法中,会从持久化存储中将消息加载出来恢复到出站队列中protected void restoreState() throws MqttException { Enumeration messageKeys = persistence.keys(); MqttPersistable persistable; String key; int highestMsgId = nextMsgId; Vector orphanedPubRels = new Vector(); // while循环内存中的所有key,并根据key取出对应的message. while (messageKeys.hasMoreElements()) { key = (String) messageKeys.nextElement(); persistable = persistence.get(key); // 将消息从持久化存储加载到内存中 MqttWireMessage message = restoreMessage(key, persistable); if (message != null) { // 将消息进行分类,根据messageid+前缀将消息放入不同的出站队列中 if (key.startsWith(PERSISTENCE_RECEIVED_PREFIX)) { inboundQoS2.put( Integer.valueOf(message.getMessageId()),message); } else if (key.startsWith(PERSISTENCE_SENT_PREFIX)) { MqttPublish sendMessage = (MqttPublish) message; highestMsgId = Math.max(sendMessage.getMessageId(), highestMsgId); if (persistence.containsKey(getSendConfirmPersistenceKey(sendMessage))) { MqttPersistable persistedConfirm = persistence.get(getSendConfirmPersistenceKey(sendMessage)); MqttPubRel confirmMessage = (MqttPubRel) restoreMessage(key, persistedConfirm); if (confirmMessage != null) { outboundQoS2.put( Integer.valueOf(confirmMessage.getMessageId()), confirmMessage); } else { } } else { sendMessage.setDuplicate(true); if (sendMessage.getMessage().getQos() == 2) { outboundQoS2.put( Integer.valueOf(sendMessage.getMessageId()),sendMessage); } else { outboundQoS1.put( Integer.valueOf(sendMessage.getMessageId()),sendMessage); } } MqttDeliveryToken tok = tokenStore.restoreToken(sendMessage); tok.internalTok.setClient(clientComms.getClient()); // 缓存到id使用映射表inUsedIds中 inUseMsgIds.put( Integer.valueOf(sendMessage.getMessageId()), Integer.valueOf(sendMessage.getMessageId())); } else if(key.startsWith(PERSISTENCE_SENT_BUFFERED_PREFIX)){ MqttPublish sendMessage = (MqttPublish) message; highestMsgId = Math.max(sendMessage.getMessageId(), highestMsgId); if(sendMessage.getMessage().getQos() == 2){ outboundQoS2.put( Integer.valueOf(sendMessage.getMessageId()),sendMessage); } else if(sendMessage.getMessage().getQos() == 1){ outboundQoS1.put( Integer.valueOf(sendMessage.getMessageId()),sendMessage); } else { outboundQoS0.put( Integer.valueOf(sendMessage.getMessageId()), sendMessage); persistence.remove(key); } MqttDeliveryToken tok = tokenStore.restoreToken(sendMessage); tok.internalTok.setClient(clientComms.getClient()); // 缓存到id使用映射表inUsedIds中 inUseMsgIds.put( Integer.valueOf(sendMessage.getMessageId()), Integer.valueOf(sendMessage.getMessageId())); } else if (key.startsWith(PERSISTENCE_CONFIRMED_PREFIX)) { MqttPubRel pubRelMessage = (MqttPubRel) message; if (!persistence.containsKey(getSendPersistenceKey(pubRelMessage))) { orphanedPubRels.addElement(key); } } } } messageKeys = orphanedPubRels.elements(); while(messageKeys.hasMoreElements()) { key = (String) messageKeys.nextElement(); persistence.remove(key); } nextMsgId = highestMsgId; }上面的代码中要注意的就是highedstMsgId这个变量,这个变量既是定义新发送消息的初始MessageId,也是被持久化的所有消息中的最大MessageId。这样做的目的是为了保证MessageId唯一。举个例子,如果有二十条消息持久化在内存中等待发送,其中最大的MessageId为100,那么将这二十条消息发送完成后,如果需要再发送新的消息,此时新消息的MessageId的初始值就不能从0开始,因为有可能会和持久化的消息中的MessageId重复,为了避免这种情况,所以会将持久化消息中messageId的最大值作为新发送消息的初始值来使用。 将消息加载在出站队列后,客户端就会和服务端建立连接,连接成功的动作为收到CONNACK类型报文,所以需要去看消费线程CommsReceiver。当收到ACK后,会调用ClientState.notifyReceiveAck()进行处理,当判断消息类型为CONNACK时,会调用restoreInflightMessage()处理出站消息队列中的消息,具体实现如下 private void restoreInflightMessages() { pendingMessages = new Vector(this.maxInflight); pendingFlows = new Vector(); Enumeration keys = outboundQoS2.keys(); // 取出outboundQoS2队列中的消息取出根据类型放到pendingMessages或 // pendingFlows中,并且优先级设置为最高 while (keys.hasMoreElements()) { Object key = keys.nextElement(); MqttWireMessage msg = (MqttWireMessage) outboundQoS2.get(key); if (msg instanceof MqttPublish) { msg.setDuplicate(true); insertInOrder(pendingMessages, (MqttPublish)msg); } else if (msg instanceof MqttPubRel) { insertInOrder(pendingFlows, (MqttPubRel)msg); } } // 取出outboundQoS1队列中的消息取出根据类型放到pendingMessages或 // pendingFlows中,并且优先级设置为最高 keys = outboundQoS1.keys(); while (keys.hasMoreElements()) { Object key = keys.nextElement(); MqttPublish msg = (MqttPublish)outboundQoS1.get(key); msg.setDuplicate(true); insertInOrder(pendingMessages, msg); } keys = outboundQoS0.keys(); // 取出outboundQoS0队列中的消息取出根据类型放到pendingMessages或 // pendingFlows中,并且优先级设置为最高 while(keys.hasMoreElements()){ Object key = keys.nextElement(); MqttPublish msg = (MqttPublish)outboundQoS0.get(key); insertInOrder(pendingMessages, msg); } // 最后使用排序法,将消息根据messageId进行排序 this.pendingFlows = reOrder(pendingFlows); this.pendingMessages = reOrder(pendingMessages); }最后附上消息重发的流程图通过五个章节的讲解,小伙伴们应该对MQTT协议以及针对Java的客户端实现Eclipse.Paho中间件有了一个比较清晰地了解,当然在使用过程当中,还是会有一些边边角角的问题,包括MQTT作为数据传输时的高并发和海量数据的应对和处理,不同的业务场景也会对应不同的架构设计,这里我也是抛砖引玉,还有很多更优秀的方案等待我们去探索,继续加油吧~~~完结~撒花~~ MQTT系列---入门介绍 MQTT系列---Java端实现消息发布与订阅 MQTT之Eclipse.Paho源码(一)--建立连接 MQTT系列-Eclipse.Paho源码分析(二)-消息的发送与接收 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |